# ---
# title: Social Analytics Dashboard
# description: Build a social media analytics dashboard using Prefect Assets, ATProto/Bluesky APIs, dbt transformations, and Streamlit visualization.
# icon: chart-bar
# dependencies: ["prefect", "prefect-aws"]
# cmd: ["python", "atproto_dashboard_with_prefect_assets.py"]
# tags: [assets, social-media, analytics, dbt, streamlit, atproto, bluesky]
# github_url: https://github.com/zzstoatzz/atproto-dashboard
# draft: false
# order: 4
# ---
#
# **Build data pipelines with Prefect Assets – declarative, dependency-aware, and observable.**
#
# This example demonstrates how to use **Prefect Assets** to build a social media analytics pipeline.
# The full implementation with [ATProto](https://atproto.com) integration, [dbt](https://www.getdbt.com) transformations, and a [Streamlit](https://streamlit.io) dashboard
# dashboard is available at: https://github.com/zzstoatzz/atproto-dashboard
#
# ## Key Prefect Features
#
# * **`@materialize` decorator** – Transform functions into versioned, cacheable data assets
# * **Automatic dependency tracking** – Prefect infers dependencies from function parameters
# * **S3-backed assets** – Store assets directly in S3 with built-in versioning
# * **Artifact creation** – Generate rich UI artifacts for observability
# * **Flow orchestration** – Coordinate asset materialization with retries and scheduling
#
# ## The Pattern: Asset-Based Data Pipelines
#
# Instead of manually managing data dependencies and storage:
# 1. Define assets with `@materialize` and unique keys (e.g., S3 paths)
# 2. Declare dependencies via function parameters or `asset_deps`
# 3. Let Prefect handle execution order, caching, and storage
# 4. Get automatic lineage tracking and observability
#
# ## Running This Example
#
# This simplified example demonstrates the core patterns. For the complete implementation:
# ```bash
# git clone https://github.com/zzstoatzz/atproto-dashboard
# cd atproto-dashboard
# # Follow README for setup and configuration
# ```
#
# ## Core Pattern: Define Assets and Dependencies
#
# Assets represent data products in your pipeline. Each asset has:
# - A unique key (often an S3 path or other storage location)
# - A materialization function decorated with `@materialize`
# - Dependencies (automatically tracked via function parameters)
#
# Define assets with descriptive keys

import json
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent
from typing import Any

from prefect import flow
from prefect.artifacts import create_markdown_artifact
from prefect.assets import Asset, materialize

raw_data_asset = Asset(key="pipeline://raw_data")
processed_data_asset = Asset(key="pipeline://processed_data")
analytics_asset = Asset(key="pipeline://analytics")

# ### Step 1: Fetch Raw Data
#
# The first asset fetches data from an external source. In the [full implementation](https://github.com/zzstoatzz/atproto-dashboard),
# this connects to the ATProto/Bluesky API to fetch social media data.


@materialize(raw_data_asset)
def fetch_raw_data() -> dict[str, Any]:
    """Fetch raw data from an external source."""
    print("Fetching raw data...")

    data = {
        "items": ["item1", "item2", "item3"],
        "fetched_at": datetime.now(timezone.utc).isoformat(),
        "count": 3,
    }

    print(f"✓ Fetched {data['count']} items")
    return data


# ### Step 2: Process the Data
#
# This asset demonstrates **automatic dependency tracking**. By accepting `raw_data` as a parameter,
# Prefect knows this asset depends on `raw_data_asset` and ensures it's materialized first.
#
# In production, this would store data to S3 with partitioning. Here we use local storage for simplicity.


@materialize(processed_data_asset)
def process_data(raw_data: dict[str, Any]) -> dict[str, Any]:
    """Process raw data into a structured format with automatic dependency tracking."""
    print(f"Processing {raw_data['count']} items...")

    processed = {
        "items": [item.upper() for item in raw_data["items"]],
        "processed_at": datetime.now(timezone.utc).isoformat(),
        "source_count": raw_data["count"],
    }

    storage_dir = Path("./data")
    storage_dir.mkdir(exist_ok=True)
    with open(storage_dir / "processed.json", "w") as f:
        json.dump(processed, f, indent=2)

    print(f"✓ Processed and stored {len(processed['items'])} items")
    return processed


# ### Step 3: Create Analytics
#
# This asset demonstrates **chained dependencies** (it depends on `processed_data`, which depends on `raw_data`)
# and **artifact creation** for rich observability in the Prefect UI.
#
# In the full implementation, this runs dbt transformations to create analytics models.


@materialize(analytics_asset)
def create_analytics(processed_data: dict[str, Any]) -> dict[str, Any]:
    """Generate analytics with chained dependencies and create UI artifacts."""
    print("Creating analytics...")

    analytics = {
        "total_items": len(processed_data["items"]),
        "source_timestamp": processed_data["processed_at"],
        "created_at": datetime.now(timezone.utc).isoformat(),
    }

    create_markdown_artifact(
        key="analytics-summary",
        markdown=dedent(
            f"""
            # Analytics Summary

            - **Total Items**: {analytics["total_items"]}
            - **Created**: {analytics["created_at"]}
            - **Source**: {analytics["source_timestamp"]}

            This artifact appears in the Prefect UI for observability.
            """
        ),
        description="Analytics summary for this pipeline run",
    )

    print(f"✓ Analytics created for {analytics['total_items']} items")
    return analytics


# ## Flow: Orchestrate Asset Materialization
#
# The flow calls each asset function, and Prefect handles:
# - Dependency resolution (ensuring correct execution order)
# - Automatic caching (skip re-computation if upstream assets haven't changed)
# - Observability (tracking lineage and execution in the UI)


@flow(name="asset-pipeline-demo", log_prints=True)
def run_asset_pipeline() -> dict[str, Any]:
    """
    Orchestrate the asset pipeline.

    By calling the materialization functions in sequence and passing results,
    Prefect automatically:
    - Tracks dependencies between assets
    - Ensures execution order
    - Provides observability in the UI
    - Enables caching and versioning
    """
    print("🚀 Starting asset pipeline")

    # Materialize assets - Prefect tracks dependencies automatically
    raw = fetch_raw_data()
    processed = process_data(raw)
    analytics = create_analytics(processed)

    print(f"✅ Pipeline complete! Processed {analytics['total_items']} items")
    return analytics


if __name__ == "__main__":
    run_asset_pipeline()

# ## What Makes Assets Powerful?
#
# 1. **Automatic Dependency Tracking**
#    - Prefect infers dependencies from function parameters
#    - Ensures correct execution order without manual DAG definition
#    - Tracks asset lineage for observability
#
# 2. **Caching and Versioning**
#    - Assets are versioned based on their inputs
#    - Skip re-computation when upstream data hasn't changed
#    - Efficient incremental processing
#
# 3. **Storage Integration**
#    - Asset keys can be S3 paths, database URIs, or any identifier
#    - Built-in support for `prefect-aws`, `prefect-gcp`, etc.
#    - Automatic data persistence and retrieval
#
# 4. **Observability**
#    - Every materialization tracked in the Prefect UI
#    - Artifacts provide rich context (tables, markdown, links)
#    - Full lineage and execution history
#
# 5. **Production Ready**
#    - Built-in retry logic and error handling
#    - Scheduling and automation via Prefect deployments
#    - Scales from local development to cloud production
#
# ## Full Implementation
#
# This example demonstrates the core patterns. The complete implementation includes:
# - Real ATProto API integration
# - S3-backed asset storage with partitioning
# - dbt transformations with DuckDB
# - Streamlit dashboard for visualization
# - Production-ready error handling and logging
#
# See the full project at: https://github.com/zzstoatzz/atproto-dashboard
#
# ## Learn More
#
# - [Prefect Assets Documentation](https://docs.prefect.io/v3/concepts/assets)
# - [prefect-aws Integration](https://docs.prefect.io/integrations/prefect-aws)
